package com.uxsino.reactorq.clientserver;

import java.io.IOException;
import java.util.function.Function;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.BlobMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uxsino.reactorq.commons.IWithAttachmentStream;
import com.uxsino.reactorq.commons.Response;

public class JMSSyncMultiSessionServer<ArgType> implements javax.jms.MessageListener {
    private static Logger logger = LoggerFactory.getLogger(JMSSyncMultiSessionServer.class);

    private static final int DEFAULT_SESSION_NUM = 5;

    private static final int TIME_TO_LIVE = 60 * 60 * 1000;

    private QueueConnection connection;

    private QueueSession srcSession;

    private MessageConsumer receiver;

    private MessageProducer replier;

    private Queue srcQueue;

    private QueueSession[] subSessions;

    private MessageConsumer[] consumers;

    private MessageProducer[] producers;

    private Queue[] queues;

    private java.util.Queue<Integer> availableSessions;

    private String queueName;

    private ObjectMapper mapper = new ObjectMapper();

    private Function<ArgType, ?> handler;

    private Class<ArgType> eventType;

    public JMSSyncMultiSessionServer(QueueConnection connection, String queueName, String receiverId,
        Class<ArgType> cls, Function<ArgType, ?> handler, int sessionNum) throws JMSException {
        this.eventType = cls;
        this.handler = handler;
        this.connection = connection;
        this.queueName = queueName;

        if (connection != null) {
            createReceiver(receiverId, sessionNum);
        }
    }

    private void createReceiver(String receiverId, int consumerNum) throws JMSException {
        srcSession = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
        srcQueue = srcSession.createQueue(queueName);
        String selector = null;
        if (receiverId != null) {
            selector = "receiverId = '" + receiverId + "'";
            // String selector = "receiverId IS NULL";// OR receiverId = '" + receiverId + "'";
            receiver = srcSession.createConsumer(srcQueue, selector);
        } else {
            receiver = srcSession.createConsumer(srcQueue);
        }
        receiver.setMessageListener(this);
        replier = srcSession.createProducer(null);
        replier.setTimeToLive(TIME_TO_LIVE);
        availableSessions = new java.util.concurrent.ConcurrentLinkedQueue<Integer>();

        int size = consumerNum <= 0 ? DEFAULT_SESSION_NUM : consumerNum;
        consumers = new MessageConsumer[size];
        queues = new Queue[size];
        subSessions = new QueueSession[size];
        producers = new MessageProducer[size];

        for (int i = 0; i < size; i++) {
            subSessions[i] = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
            queues[i] = subSessions[i].createQueue(queueName + "_subqueue_" + i);
            producers[i] = subSessions[i].createProducer(null);
            producers[i].setTimeToLive(TIME_TO_LIVE);
            if (selector != null) {
                consumers[i] = subSessions[i].createConsumer(queues[i], selector);
            } else {
                consumers[i] = subSessions[i].createConsumer(queues[i]);
            }
            consumers[i].setMessageListener(new SubQueueListener(i, subSessions[i], producers[i]));
            availableSessions.add(i);
        }
    }

    @Override
    public void onMessage(Message message) {
        while (availableSessions.isEmpty()) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                logger.error("sleep srcSession is interrupted", e);
            }
        }
        int index = availableSessions.poll();
        try {
            replier.send(queues[index], message);
        } catch (JMSException e) {
            logger.error("sending message to subqueue error ", e);
            availableSessions.add(index);
        }
        try {
            srcSession.commit();
        } catch (JMSException e) {
            logger.error("srcSession commit error ", e);
        }

    }

    public class SubQueueListener implements javax.jms.MessageListener {

        private int id;

        private QueueSession session;

        private MessageProducer producer;

        public SubQueueListener(int id, QueueSession s, MessageProducer p) {
            this.id = id;
            this.session = s;
            this.producer = p;
        }

        @Override
        public void onMessage(Message message) {
            try {

                try {
                    Response response = new Response();

                    if (handler != null) {
                        try {
                            ArgType ev = mapper.readValue(getMessageText(message), eventType);

                            if (ev instanceof IWithAttachmentStream && message instanceof BlobMessage) {
                                try {
                                    ((IWithAttachmentStream) ev)
                                        .setAttachement(((BlobMessage) message).getInputStream());
                                } catch (IOException | JMSException e) {
                                    logger.error("error getting attachment");
                                }
                            }
                            Object r = handler.apply(ev);
                            response.j = mapper.writeValueAsString(r);
                            response.status = Response.STATUS_SUCCESS;
                        } catch (Exception e) {
                            response.status = Response.STATUS_ERROR;
                            response.j = e.getMessage();
                        }
                    }
                    // Check for a ReplyTo Queue
                    javax.jms.Queue replyQueue = (javax.jms.Queue) message.getJMSReplyTo();

                    if (replyQueue != null) {
                        javax.jms.TextMessage reply = session.createTextMessage();
                        try {
                            reply.setText(mapper.writeValueAsString(response));
                        } catch (JsonProcessingException e) {
                            logger.error("", e);
                            response.status = Response.STATUS_ERROR;
                            response.j = e.getMessage();
                            try {
                                reply.setText(mapper.writeValueAsString(response));
                            } catch (Exception ex) {
                                logger.error("", e);
                                return;
                            }
                        }

                        reply.setJMSCorrelationID(message.getJMSMessageID());
                        producer.send(replyQueue, reply);
                        session.commit();
                    }
                } catch (javax.jms.JMSException jmse) {
                    logger.error("", jmse);
                }
            } catch (java.lang.RuntimeException rte) {
                logger.error("", rte);
            } finally {
                availableSessions.add(id);
            }
        }

        private String getMessageText(Message msg) throws JMSException {
            if (msg instanceof TextMessage) {
                return ((TextMessage) msg).getText();
            }
            return msg.getStringProperty("text");
        }

    }

}
